package com.lianda.connectors.source;

import com.lianda.model.Student;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

/**
 * 自定义Source
 */
public class SourceFromMySQL extends RichSourceFunction {

    PreparedStatement ps;
    private Connection connection;

    @Override
    public void open(Configuration parameters) throws Exception {
        //在其中建立连接
        super.open(parameters);
        connection = getConnection();
        String sql = "select * from Student;";
        ps = this.connection.prepareStatement(sql);
    }

    @Override
    public void close() throws Exception {
        //关闭释放连接
        super.close();
        if (connection != null) {
            connection.close();
        }
        if (ps != null) {
            ps.close();
        }
    }

    //获取数据
    @Override
    public void run(SourceContext ctx) throws Exception {
        ResultSet rs = ps.executeQuery();
        while (rs.next()) {
            Student student = new Student(
                    rs.getInt("id"),
                    rs.getString("name").trim(),
                    rs.getString("password").trim(),
                    rs.getInt("age")
            );
            ctx.collect(student);
        }
    }

    @Override
    public void cancel() {

    }

    private static Connection getConnection() {
        Connection con = null;
        try {
            Class.forName("com.mysql.jdbc.Driver");
            con = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "123456");
        } catch (Exception e) {
            System.out.println("mysql get connection has exception , msg = " + e.getMessage());
        }
        return con;
    }
}
